﻿/**
* CRL
*/
using CRL.Core.Extension;
using CRL.RabbitMQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace CRL.EventBus.Queue
{
    public class DirectRabbitMQEx : DirectRabbitMQ
    {
        public DirectRabbitMQEx(ConnectionConfig config, string exchangeName) : base(config, exchangeName)
        {

        }
    }
    internal class RabbitMQConfig : ConfigBase
    {
        internal Action<ConnectionConfig> configFunc;
        internal Action<ConsumeOption> consumeOption;
        internal string DefaultQueueName = "EventBusQueue.116";
    }
    class RabbitMQ : AbsQueue
    {
        //require rabbitmq-server-3.9.13 rabbitmq_delayed_message_exchange-3.9.0
        DirectRabbitMQ client;
        DelayRabbitMQ delayClient;
        static DelayRabbitMQ errorQueueClient;
        RabbitMQConfig _queueConfig;
        string exchangeName = "CRLEventBusExc";
        //string Name { get; set; }
        //bool _async;
        public override MQType MQType => MQType.RabbitMQ;
        string errorRepeatDelayQueue = "eventBusErrorSend";
        string convertData(byte[] data)
        {
            return Encoding.UTF8.GetString(data);
        }
        public RabbitMQ(ConfigBase queueConfig, bool async)
        {
            //_async = async;
            _queueConfig = (RabbitMQConfig)queueConfig;
            var config = new ConnectionConfig();
            _queueConfig.configFunc.Invoke(config);
            config.ConsumersAsync = async;
            client = new DirectRabbitMQEx(config, exchangeName);
            #region 异常重发队列
            if (errorQueueClient == null)
            {
                var config2 = config.ToType<ConnectionConfig>();
                config2.ConsumersAsync = false;
                errorQueueClient = new DelayRabbitMQ(config2, "errorQueueExc");
                errorQueueClient.BeginReceive(errorRepeatDelayQueue, errorRepeatDelayQueue, (msg, key) =>
                    {
                        //延迟队列转发原始消息
                        var msgPackage = convertData(msg).ToObject<MsgPackage>();
                        client.Publish(msgPackage.OriginRoutingKey, new MsgPackage
                        {
                            RoutingKey = msgPackage.OriginRoutingKey,
                            _InnerData = msgPackage._InnerData,
                            DelayMs = msgPackage.DelayMs,
                            Priority = msgPackage.Priority,
                            RetryTimes = msgPackage.RetryTimes,
                            OriginRoutingKey = "",
                            MsgKey = msgPackage.MsgKey,
                            Time = msgPackage.Time
                        });
                    }, new ConsumeOption { });
            }
            #endregion
            //client.ConsumerChannelFunc = queueConfig.ConsumerChannelFunc;
        }
        void checkDelayClient()
        {
            if (delayClient == null)
            {
                var config = new ConnectionConfig();
                _queueConfig.configFunc.Invoke(config);
                delayClient = new DelayRabbitMQ(config, exchangeName + "_delay");
            }
        }

        public override void PublishList(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            if (string.IsNullOrEmpty(routingKey))
            {
                routingKey = msgs.First().GetType().Name;
            }
            var data = msgs.ToJson();
            var opt = new PublishOption();
            optFunc?.Invoke(opt);
            var delayMs = opt.DelaySecond > 0 ? opt.DelaySecond * 1000 : opt.DelayMs;
            if (delayMs > 0)
            {
                checkDelayClient();
                delayClient.Publish(routingKey, delayMs, b =>
                {
                    b.Persistent = true;
                    b.Priority = (byte)opt.Priority;
                }, new MsgPackage { _InnerData = data, RoutingKey = routingKey, DelayMs = delayMs, MsgKey = opt.MsgKey });
                return;
            }
            client.Publish(routingKey, b =>
            {
                b.Persistent = true;
                b.Priority = (byte)opt.Priority;
            }, new MsgPackage { _InnerData = data, RoutingKey = routingKey, MsgKey = opt.MsgKey });
        }
        public override Task PublishListAsync(string routingKey, IEnumerable<object> msgs, Action<PublishOption> optFunc = null)
        {
            PublishList(routingKey, msgs, optFunc);
            return Task.CompletedTask;
        }
        public override void DeleteData(IData data)
        {
            var msgPackage = data as MsgPackage;
            var ed = SubscribeService.GetEventDeclare(data.RoutingKey);
            Log(ed, $"重复发送已终止 {msgPackage.ToJson()}", $"{ed.Name}.delete");
            SubscribeService.OnEventDataRemove(msgPackage);
        }
        public override void RePublish(IData data, object msg)
        {
            //使用默认ack无法控制重发间隔，但会保持重发消息顺序性(确认？)
            //使用延迟队列解决了重发间隔问题，但无法保持重发后的顺序性
            var ed = SubscribeService.GetEventDeclare(data.RoutingKey);
            var attr = ed.SubscribeAttribute;
            //发送到延迟队列
            var delayMs = attr.GetDelayTime(data);
            if (delayMs < 0)
            {
                return;
            }
            errorQueueClient.Publish(errorRepeatDelayQueue, delayMs, b =>
            {
                b.Persistent = true;
                b.Priority = (byte)data.Priority;
            }, new MsgPackage
            {
                _InnerData = msg.ToJson(),
                OriginRoutingKey = data.RoutingKey,
                RoutingKey = errorRepeatDelayQueue,
                RetryTimes = data.RetryTimes + 1,
                DelayMs = data.DelayMs,
                Priority = data.Priority,
                MsgKey = data.MsgKey,
                Time = data.Time
            });
        }

        public override void Subscribe(EventDeclare eventDeclare)
        {
            var attr = eventDeclare.SubscribeAttribute;
            var queueName = _queueConfig.DefaultQueueName;
            if (!string.IsNullOrEmpty(attr.QueueName))
            {
                queueName = attr.QueueName;
            }
            var routingKey = eventDeclare.Name;
            var consumeOption = new ConsumeOption();
            _queueConfig.consumeOption?.Invoke(consumeOption);
            //consumeOption.ConsumerRetryTimes = attr.RetryTimes;
            consumeOption.LazyConsume = true;
            consumeOption.QueueDeclareArgs = new Dictionary<string, object>() { { "x-max-priority", 10 } };
            if (attr.DelayQueue)
            {
                //client?.Dispose();
                checkDelayClient();
                delayClient.BeginReceive(queueName + "_delay", routingKey, (msg, key) =>
                {
                    OnReceiveString(convertData(msg), key);
                }, consumeOption);
                return;
            }
            //同步订阅
            client.BeginReceive(queueName, routingKey, (msg, key) =>
            {
                OnReceiveString(convertData(msg), key);
            }, consumeOption);
        }

        public override void SubscribeAsync(EventDeclare eventDeclare)
        {
            var attr = eventDeclare.SubscribeAttribute;
            var queueName = _queueConfig.DefaultQueueName;
            if (!string.IsNullOrEmpty(attr.QueueName))
            {
                queueName = attr.QueueName;
            }
            var routingKey = eventDeclare.Name;
            var consumeOption = new ConsumeOption();
            _queueConfig.consumeOption?.Invoke(consumeOption);
            //consumeOption.ConsumerRetryTimes = attr.RetryTimes;
            consumeOption.LazyConsume = true;
            consumeOption.QueueDeclareArgs = new Dictionary<string, object>() { { "x-max-priority", 10 } };

            if (attr.DelayQueue)
            {
                //client?.Dispose();
                checkDelayClient();
                delayClient.BeginReceiveAsync(queueName + "_delay", routingKey, (msg, key) =>
                {
                    return OnReceiveAsync(convertData(msg), key);
                }, consumeOption);
                return;
            }
            //异步订阅
            client.BeginReceiveAsync(queueName, routingKey, (msg, key) =>
            {
                return OnReceiveAsync(convertData(msg), key);
            }, consumeOption);
        }

        public override void Dispose()
        {
            client?.Dispose();
            delayClient?.Dispose();
        }
        public override long CleanQueue(string name)
        {
            return client.CleanQueue(name);
        }
        public override long GetQueueLength(string name)
        {
            return client.GetQueueLength(name);
        }
        public override void ConfirmConsume()
        {
            client.ConfirmConsume();
            delayClient?.ConfirmConsume();
        }
    }

    //    class RabbitMQPublisher : AbsPublisher
    //    {
    //#if NETSTANDARD 
    //        public RabbitMQPublisher(Microsoft.Extensions.Options.IOptions<QueueConfig> options) : base(options.Value)
    //        {
    //            queue = QueueFactory.CreateClient(options.Value.GetMqSetting(MQType.RabbitMQ), false);
    //            //queueConfig = options.Value;
    //        }
    //#endif
    //        public RabbitMQPublisher(QueueConfig _queueConfig) : base(_queueConfig)
    //        {
    //            queue = QueueFactory.CreateClient(_queueConfig.GetMqSetting(MQType.RabbitMQ), false);
    //        }
    //    }
}
